Estimating initial conditions parameters
In [10]:
Copied!
%matplotlib notebook
%matplotlib notebook
In [11]:
Copied!
%load_ext autoreload
%autoreload 3
%load_ext autoreload
%autoreload 3
The autoreload extension is already loaded. To reload it, use: %reload_ext autoreload
In [12]:
Copied!
import sys
import os
IN_COLAB = 'google.colab' in sys.modules
main_dir = '/content/PythonMathematicalModeling/' if IN_COLAB else ''
docs_dir = os.path.join(main_dir, 'docs')
project_dir = os.path.join(docs_dir, 'PlagueProject') if IN_COLAB else ''
data_private_folder = os.path.join(project_dir, 'data', 'private')
# GitHub credentials for Colab
USERNAME="" # Your GitHub username
GH_TOKEN="" # Your github token (Settings -> Developer Settings -> Personal Access Tokens -> Generate new token)
if IN_COLAB:
! rm -rf *
! git config --system credential.helper store
! echo "https://{USERNAME}:{GH_TOKEN}@github.com" > ~/.git-credentials
! git clone --recurse-submodules https://github.com/polislizarralde/PythonMathematicalModeling
sys.path += [main_dir, docs_dir, project_dir, data_private_folder]
import sys
import os
IN_COLAB = 'google.colab' in sys.modules
main_dir = '/content/PythonMathematicalModeling/' if IN_COLAB else ''
docs_dir = os.path.join(main_dir, 'docs')
project_dir = os.path.join(docs_dir, 'PlagueProject') if IN_COLAB else ''
data_private_folder = os.path.join(project_dir, 'data', 'private')
# GitHub credentials for Colab
USERNAME="" # Your GitHub username
GH_TOKEN="" # Your github token (Settings -> Developer Settings -> Personal Access Tokens -> Generate new token)
if IN_COLAB:
! rm -rf *
! git config --system credential.helper store
! echo "https://{USERNAME}:{GH_TOKEN}@github.com" > ~/.git-credentials
! git clone --recurse-submodules https://github.com/polislizarralde/PythonMathematicalModeling
sys.path += [main_dir, docs_dir, project_dir, data_private_folder]
In [13]:
Copied!
try:
from funct_process_data import * # Import all functions from funct_process_data.py
except ImportError:
if IN_COLAB:
! pip3 install -r PythonMathematicalModeling/requirements.txt
print('Stopping RUNTIME! Please run again.')
os.kill(os.getpid(), 9)
try:
from funct_process_data import * # Import all functions from funct_process_data.py
except ImportError:
if IN_COLAB:
! pip3 install -r PythonMathematicalModeling/requirements.txt
print('Stopping RUNTIME! Please run again.')
os.kill(os.getpid(), 9)
In [14]:
Copied!
# Set the working directory for private files
# Southeast Scania
southeast_path = os.path.join(data_private_folder, 'southeastScania.csv')
southeastScania = pd.read_csv(southeast_path, sep=',')
# Middle Scania
middle_path = os.path.join(data_private_folder, 'middleScania.csv')
middleScania = pd.read_csv(middle_path, sep=',')
# Southwest Scania
southwest_path = os.path.join(data_private_folder, 'southwestScania.csv')
southwestScania = pd.read_csv(southwest_path, sep=',')
# Concatenate all the South Scania data into one dataframe and reset the index
southScania = pd.concat([southeastScania, middleScania, southwestScania], ignore_index=True)
# Set the working directory for private files
# Southeast Scania
southeast_path = os.path.join(data_private_folder, 'southeastScania.csv')
southeastScania = pd.read_csv(southeast_path, sep=',')
# Middle Scania
middle_path = os.path.join(data_private_folder, 'middleScania.csv')
middleScania = pd.read_csv(middle_path, sep=',')
# Southwest Scania
southwest_path = os.path.join(data_private_folder, 'southwestScania.csv')
southwestScania = pd.read_csv(southwest_path, sep=',')
# Concatenate all the South Scania data into one dataframe and reset the index
southScania = pd.concat([southeastScania, middleScania, southwestScania], ignore_index=True)
Set the working directory for private files with detail information for some parishes
In [15]:
Copied!
# Southeast Scania
southeast_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Southeast')
# Middle Scania
middle_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Middle')
# Southwest Scania
southwest_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Southwest')
# Southeast Scania
southeast_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Southeast')
# Middle Scania
middle_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Middle')
# Southwest Scania
southwest_parishes_folder = os.path.join(data_private_folder, 'Deaths_parish', 'Plague', 'Southwest')
Function to call the data by parish and transform the date to an appropiate format
In [16]:
Copied!
def get_parish_data(parish_name, parish_folder):
parish_path = os.path.join(parish_folder, parish_name + '.xlsx')
parish = pd.read_excel(parish_path, sheet_name='Plague')
# Convert 'EndDate' to datetime with appropriate format
parish['NewEndDate'] = pd.to_datetime(parish['EndDate'], format='%b %Y')
parish['NewEndDate'] = parish['NewEndDate'].dt.to_period('M')
parish['first_day'] = parish['NewEndDate'].dt.to_timestamp()
parish['last_day'] = parish['NewEndDate'].dt.to_timestamp(how='end')
# Add a column with the days since the first date and then cumsum
parish['Days'] = parish['last_day'].dt.daysinmonth
parish['Days'] = parish['Days'].cumsum()
return parish
def get_parish_data(parish_name, parish_folder):
parish_path = os.path.join(parish_folder, parish_name + '.xlsx')
parish = pd.read_excel(parish_path, sheet_name='Plague')
# Convert 'EndDate' to datetime with appropriate format
parish['NewEndDate'] = pd.to_datetime(parish['EndDate'], format='%b %Y')
parish['NewEndDate'] = parish['NewEndDate'].dt.to_period('M')
parish['first_day'] = parish['NewEndDate'].dt.to_timestamp()
parish['last_day'] = parish['NewEndDate'].dt.to_timestamp(how='end')
# Add a column with the days since the first date and then cumsum
parish['Days'] = parish['last_day'].dt.daysinmonth
parish['Days'] = parish['Days'].cumsum()
return parish
Initializing the population size (pop_parish) and the initial conditions for each variable (S0, E0, I0, R0, D0):
In [17]:
Copied!
def get_parish_info(parish_name, df: pd.DataFrame, column_name='ParishName', column_pop='BEF1699'):
pop_df = df[(df[column_name] == parish_name)][column_pop]
name_df = df[(df[column_name] == parish_name)][column_name]
if not pop_df.empty and not name_df.empty:
pop_parish = pop_df.values[0]
name_parish = name_df.values[0]
else:
pop_parish = None
name_parish = None
return pop_parish, name_parish
def get_parish_info(parish_name, df: pd.DataFrame, column_name='ParishName', column_pop='BEF1699'):
pop_df = df[(df[column_name] == parish_name)][column_pop]
name_df = df[(df[column_name] == parish_name)][column_name]
if not pop_df.empty and not name_df.empty:
pop_parish = pop_df.values[0]
name_parish = name_df.values[0]
else:
pop_parish = None
name_parish = None
return pop_parish, name_parish
In [22]:
Copied!
parish_file = get_parish_data('Ystad', southeast_parishes_folder)
parish = southScania[southScania['ParishName'] == 'YSTAD']
merged = pd.merge(parish_file, parish, on='ParishName', how='inner')
# # Convert WKT (Well-Known Text) geometry to Shapely geometry
# merged['geometry'] = merged['geometry'].apply(wkt.loads)
# # Create a GeoDataFrame from the DataFrame
# merged = gpd.GeoDataFrame(merged, geometry='geometry')
# merged
parish_file = get_parish_data('Ystad', southeast_parishes_folder)
parish = southScania[southScania['ParishName'] == 'YSTAD']
merged = pd.merge(parish_file, parish, on='ParishName', how='inner')
# # Convert WKT (Well-Known Text) geometry to Shapely geometry
# merged['geometry'] = merged['geometry'].apply(wkt.loads)
# # Create a GeoDataFrame from the DataFrame
# merged = gpd.GeoDataFrame(merged, geometry='geometry')
# merged
In [23]:
Copied!
parish_file
parish_file
Out[23]:
| ParishName | EndDate | Deaths | CumDeaths | NewEndDate | first_day | last_day | Days | |
|---|---|---|---|---|---|---|---|---|
| 0 | YSTAD | Jun 1712 | 26 | 26 | 1712-06 | 1712-06-01 | 1712-06-30 23:59:59.999999999 | 30 |
| 1 | YSTAD | Jul 1712 | 80 | 106 | 1712-07 | 1712-07-01 | 1712-07-31 23:59:59.999999999 | 61 |
| 2 | YSTAD | Aug 1712 | 303 | 409 | 1712-08 | 1712-08-01 | 1712-08-31 23:59:59.999999999 | 92 |
| 3 | YSTAD | Sep 1712 | 202 | 611 | 1712-09 | 1712-09-01 | 1712-09-30 23:59:59.999999999 | 122 |
| 4 | YSTAD | Oct 1712 | 84 | 695 | 1712-10 | 1712-10-01 | 1712-10-31 23:59:59.999999999 | 153 |
| 5 | YSTAD | Nov 1712 | 35 | 730 | 1712-11 | 1712-11-01 | 1712-11-30 23:59:59.999999999 | 183 |
| 6 | YSTAD | Dec 1712 | 5 | 735 | 1712-12 | 1712-12-01 | 1712-12-31 23:59:59.999999999 | 214 |
In [24]:
Copied!
parish
parish
Out[24]:
| Region | District(Härad) | ParishName | G_NAME | BEF1699 | BEF1718 | AV_BEF | ChurchBook | OtherSources | BeginPlaguePeriod | EndPlaguePeriod | VictimsNumber | geometry | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 18 | SOUTHEAST | HERRESTAD | YSTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
In [25]:
Copied!
merged
merged
Out[25]:
| ParishName | EndDate | Deaths | CumDeaths | NewEndDate | first_day | last_day | Days | Region | District(Härad) | G_NAME | BEF1699 | BEF1718 | AV_BEF | ChurchBook | OtherSources | BeginPlaguePeriod | EndPlaguePeriod | VictimsNumber | geometry | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | YSTAD | Jun 1712 | 26 | 26 | 1712-06 | 1712-06-01 | 1712-06-30 23:59:59.999999999 | 30 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 1 | YSTAD | Jul 1712 | 80 | 106 | 1712-07 | 1712-07-01 | 1712-07-31 23:59:59.999999999 | 61 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 2 | YSTAD | Aug 1712 | 303 | 409 | 1712-08 | 1712-08-01 | 1712-08-31 23:59:59.999999999 | 92 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 3 | YSTAD | Sep 1712 | 202 | 611 | 1712-09 | 1712-09-01 | 1712-09-30 23:59:59.999999999 | 122 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 4 | YSTAD | Oct 1712 | 84 | 695 | 1712-10 | 1712-10-01 | 1712-10-31 23:59:59.999999999 | 153 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 5 | YSTAD | Nov 1712 | 35 | 730 | 1712-11 | 1712-11-01 | 1712-11-30 23:59:59.999999999 | 183 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
| 6 | YSTAD | Dec 1712 | 5 | 735 | 1712-12 | 1712-12-01 | 1712-12-31 23:59:59.999999999 | 214 | SOUTHEAST | HERRESTAD | YSTADS | 1782 | 1156 | 1469.0 | 1 | 4 | JUN 1712 | DEC 1712 | 740 | POLYGON ((4240506.247557897 3176029.5812564003... |
In [16]:
Copied!
class Initial_Model:
def __init__(self, gdf, beta_guess:float, mu_guess:float, p_guess:float, df2: pd.DataFrame):
self.gdf = gdf
self.n = self.numPatches()
self.E0 = np.zeros(self.n)
self.I0 = np.zeros(self.n)
self.I0[0] = 1.0
self.R0 = np.zeros(self.n)
self.D0 = np.zeros(self.n)
self.S0 = np.zeros(self.n)
self.beta =np.full(self.n, beta_guess)# dimension = self.n and value for all the components = beta_guess
self.mu = np.full(self.n, mu_guess)
self.p = np.full(self.n, p_guess)
for i in range(self.n):
self.S0[i] = self.patchPop()[i] - self.E0[i] - \
self.I0[i] - self.R0[i]
def patchNames(self, column_name: str = 'ParishName'):
return self.gdf[column_name].values
def numPatches(self):
return len(self.patchNames())
def patchPop(self, column_pop: str = 'BEF1699'):
return self.gdf[column_pop].values
def maxDays(self, column_EndDays: str = 'EndDaysPlague'):
return self.gdf[column_EndDays].max()
class Initial_Model:
def __init__(self, gdf, beta_guess:float, mu_guess:float, p_guess:float, df2: pd.DataFrame):
self.gdf = gdf
self.n = self.numPatches()
self.E0 = np.zeros(self.n)
self.I0 = np.zeros(self.n)
self.I0[0] = 1.0
self.R0 = np.zeros(self.n)
self.D0 = np.zeros(self.n)
self.S0 = np.zeros(self.n)
self.beta =np.full(self.n, beta_guess)# dimension = self.n and value for all the components = beta_guess
self.mu = np.full(self.n, mu_guess)
self.p = np.full(self.n, p_guess)
for i in range(self.n):
self.S0[i] = self.patchPop()[i] - self.E0[i] - \
self.I0[i] - self.R0[i]
def patchNames(self, column_name: str = 'ParishName'):
return self.gdf[column_name].values
def numPatches(self):
return len(self.patchNames())
def patchPop(self, column_pop: str = 'BEF1699'):
return self.gdf[column_pop].values
def maxDays(self, column_EndDays: str = 'EndDaysPlague'):
return self.gdf[column_EndDays].max()
Getting the centroid of each polygon for defining the transmission matrix.
In [16]:
Copied!
southeastScania = get_centroid(southeastScania)
southeastScania = get_centroid(southeastScania)
Generating the differential equations
In [18]:
Copied!
def SEIRD_model(y, t, model_parameters: tuple[dict]):
parameters = model_parameters[0]
gdf = parameters['gdf']
beta = parameters['beta']
p = parameters['p']
gamma = parameters['gamma']
sigma = parameters['sigma']
bump_center = parameters['bump_center']
bump_width = parameters['bump_width']
bump_height = parameters['bump_height']
mu = parameters['mu']
N = parameters['N']
n = parameters['n']
vars = y
def entryfun(i, offset): return vars[5 * i + offset]
# Create an array from the entry function
entry = np.array([[entryfun(i, j) for j in range(5)] for i in range(len(vars) // 5)])
beta_matrix = transmission_matrix_beta(gdf)
p_matrix = transmission_matrix_p(gdf)
seasonal_rate = lambda t : seasonal_transmission_rate(t, bump_center, bump_width, bump_height)
matrix = lambda t : (beta + seasonal_rate(t)) * beta_matrix + (p + seasonal_rate(t)) * p_matrix
sum_transmission = lambda t : np.sum(matrix(t) * entry[:, 2], axis=1)
dS = -entry[:, 0] / N * sum_transmission(t)
dE = entry[:, 0] / N * sum_transmission(t) - sigma * entry[:, 1]
dI = sigma * entry[:, 1] - gamma * entry[:, 2]
dR = gamma * (1 - mu) * entry[:, 2]
dD = gamma * mu * entry[:, 2]
derivatives = np.stack((dS, dE, dI, dR, dD), axis=1).flatten()
# for i in range(n):
# sum_transmission = sum( matrix(t)[i][j] * entry(j, 2) for j in range(n))
# dS[i] = - entry(i, 0) / N[i] * sum_transmission
# dE[i] = entry(i, 0) / N[i] * sum_transmission - sigma * entry(i, 1)
# dI[i] = sigma * entry(i, 1) - gamma * entry(i, 2)
# dR[i] = gamma * (1 - mu) * entry(i, 2)
# dD[i] = gamma * mu * entry(i, 2)
# derivates = [val for i in range(n) for val in (
# dS[i], dE[i], dI[i], dR[i], dD[i])]
return derivatives
def generate_sol(genInput: dict) -> dict:
init = genInput['init']
n = genInput['n']
initConditions = [val for i in range(n) for val in (
init['S'][i], init['E'][i], init['I'][i], init['R'][i], init['D'][i])]
T = genInput['T']
t = np.linspace(0, T, T+1)
model = genInput['model']
solution = scipy.odeint(model, initConditions, t, args=((genInput,),))
indexVar = {'S': 0, 'E': 1, 'I': 2, 'R': 3, 'D': 4}
def varSol(patch, var): return solution[:, 5*patch + indexVar[var]]
return {var: {patch: varSol(patch, var) for patch in range(n)} for var in indexVar.keys()}
def SEIRD_model(y, t, model_parameters: tuple[dict]):
parameters = model_parameters[0]
gdf = parameters['gdf']
beta = parameters['beta']
p = parameters['p']
gamma = parameters['gamma']
sigma = parameters['sigma']
bump_center = parameters['bump_center']
bump_width = parameters['bump_width']
bump_height = parameters['bump_height']
mu = parameters['mu']
N = parameters['N']
n = parameters['n']
vars = y
def entryfun(i, offset): return vars[5 * i + offset]
# Create an array from the entry function
entry = np.array([[entryfun(i, j) for j in range(5)] for i in range(len(vars) // 5)])
beta_matrix = transmission_matrix_beta(gdf)
p_matrix = transmission_matrix_p(gdf)
seasonal_rate = lambda t : seasonal_transmission_rate(t, bump_center, bump_width, bump_height)
matrix = lambda t : (beta + seasonal_rate(t)) * beta_matrix + (p + seasonal_rate(t)) * p_matrix
sum_transmission = lambda t : np.sum(matrix(t) * entry[:, 2], axis=1)
dS = -entry[:, 0] / N * sum_transmission(t)
dE = entry[:, 0] / N * sum_transmission(t) - sigma * entry[:, 1]
dI = sigma * entry[:, 1] - gamma * entry[:, 2]
dR = gamma * (1 - mu) * entry[:, 2]
dD = gamma * mu * entry[:, 2]
derivatives = np.stack((dS, dE, dI, dR, dD), axis=1).flatten()
# for i in range(n):
# sum_transmission = sum( matrix(t)[i][j] * entry(j, 2) for j in range(n))
# dS[i] = - entry(i, 0) / N[i] * sum_transmission
# dE[i] = entry(i, 0) / N[i] * sum_transmission - sigma * entry(i, 1)
# dI[i] = sigma * entry(i, 1) - gamma * entry(i, 2)
# dR[i] = gamma * (1 - mu) * entry(i, 2)
# dD[i] = gamma * mu * entry(i, 2)
# derivates = [val for i in range(n) for val in (
# dS[i], dE[i], dI[i], dR[i], dD[i])]
return derivatives
def generate_sol(genInput: dict) -> dict:
init = genInput['init']
n = genInput['n']
initConditions = [val for i in range(n) for val in (
init['S'][i], init['E'][i], init['I'][i], init['R'][i], init['D'][i])]
T = genInput['T']
t = np.linspace(0, T, T+1)
model = genInput['model']
solution = scipy.odeint(model, initConditions, t, args=((genInput,),))
indexVar = {'S': 0, 'E': 1, 'I': 2, 'R': 3, 'D': 4}
def varSol(patch, var): return solution[:, 5*patch + indexVar[var]]
return {var: {patch: varSol(patch, var) for patch in range(n)} for var in indexVar.keys()}
Trying a small dataframe
In [16]:
Copied!
# k = 6
# example = southeastScania.head(k)
# example.shape
# model_input = Initial_Model(example)
# k = 6
# example = southeastScania.head(k)
# example.shape
# model_input = Initial_Model(example)
In [17]:
Copied!
# Model_test = {'model': SEIRD_model,
# 'init': {
# 'S': model_input.S0,
# 'E': model_input.E0,
# 'I': model_input.I0,
# 'R': model_input.R0,
# 'D': model_input.D0,
# }, # defining the initial values for the model
# 'gdf': example, # defining the graph
# 'beta': 0.3,
# 'p': 0.1,
# 'bump_center': 0.1,
# 'bump_width': 180.0,
# 'bump_height': 30.0,
# 'gamma': 0.06,
# 'sigma': 0.02,
# 'mu': 0.2,
# 'N': model_input.patchPop(),
# 'n': model_input.n,
# 'T': model_input.maxDays()}
# model_dict = generate_sol(Model_test)
# Model_test = {'model': SEIRD_model,
# 'init': {
# 'S': model_input.S0,
# 'E': model_input.E0,
# 'I': model_input.I0,
# 'R': model_input.R0,
# 'D': model_input.D0,
# }, # defining the initial values for the model
# 'gdf': example, # defining the graph
# 'beta': 0.3,
# 'p': 0.1,
# 'bump_center': 0.1,
# 'bump_width': 180.0,
# 'bump_height': 30.0,
# 'gamma': 0.06,
# 'sigma': 0.02,
# 'mu': 0.2,
# 'N': model_input.patchPop(),
# 'n': model_input.n,
# 'T': model_input.maxDays()}
# model_dict = generate_sol(Model_test)
In [18]:
Copied!
# %matplotlib inline
# # Set up the data to fit
# beginTime = southeastScania['BeginDaysPlague'].values
# endTime = southeastScania['EndDaysPlague'].values
# deathData = southeastScania['VictimsNumber'].values
# # Number of patches
# n = Model_test['n']
# # Set the figsize for each subplot
# figsize_single_subplot = (8, 2)
# # Calculate the total figure height based on the number of subplots and their height
# fig_height = figsize_single_subplot[1] * n
# # Create a figure and an array of axes with nrows=n and ncols=1
# fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
# figsize_single_subplot[0], fig_height), sharex=False)
# # Plot model solution D for each patch
# for i in range(n):
# if deathData[i] != 0 and endTime[i] != 0:
# initial_position = beginTime[i]
# final_position = endTime[i]
# axes[i].plot(initial_position, 0, 'bo')
# axes[i].plot(final_position,
# deathData[i], 'bo')
# axes[i].plot(model_dict['D'][i], color='orange', label=(model_input.patchNames()[i]))
# axes[i].set_ylabel('Cumulative Deaths')
# else:
# axes[i].plot(model_dict['D'][i],
# color='orange', label=(model_input.patchNames()[i]))
# axes[i].set_ylabel('Cumulative Deaths')
# axes[i].legend(loc='lower right')
# # Adjust the layout to avoid overlapping
# plt.tight_layout()
# plt.show()
# %matplotlib inline
# # Set up the data to fit
# beginTime = southeastScania['BeginDaysPlague'].values
# endTime = southeastScania['EndDaysPlague'].values
# deathData = southeastScania['VictimsNumber'].values
# # Number of patches
# n = Model_test['n']
# # Set the figsize for each subplot
# figsize_single_subplot = (8, 2)
# # Calculate the total figure height based on the number of subplots and their height
# fig_height = figsize_single_subplot[1] * n
# # Create a figure and an array of axes with nrows=n and ncols=1
# fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
# figsize_single_subplot[0], fig_height), sharex=False)
# # Plot model solution D for each patch
# for i in range(n):
# if deathData[i] != 0 and endTime[i] != 0:
# initial_position = beginTime[i]
# final_position = endTime[i]
# axes[i].plot(initial_position, 0, 'bo')
# axes[i].plot(final_position,
# deathData[i], 'bo')
# axes[i].plot(model_dict['D'][i], color='orange', label=(model_input.patchNames()[i]))
# axes[i].set_ylabel('Cumulative Deaths')
# else:
# axes[i].plot(model_dict['D'][i],
# color='orange', label=(model_input.patchNames()[i]))
# axes[i].set_ylabel('Cumulative Deaths')
# axes[i].legend(loc='lower right')
# # Adjust the layout to avoid overlapping
# plt.tight_layout()
# plt.show()
Defining the optimization problem:
In [19]:
Copied!
model_input = Initial_Model(southeastScania)
model_input = Initial_Model(southeastScania)
In [20]:
Copied!
# Define the objective function to minimize (sum of squared errors)
def objectiveFunction(parameters, beginTime, endTime, deathData):
beta, p, bump_center, bump_width, bump_height = parameters
model_info = {'model': SEIRD_model,
'init': {
'S': model_input.S0,
'E': model_input.E0,
'I': model_input.I0,
'R': model_input.R0,
'D': model_input.D0,
},
'gdf': southeastScania,
# defining the initial values for the model
'beta': model_input.beta,
'p': p,
'mu': model_input.mu,
'bump_center': bump_center,
'bump_width': bump_width,
'bump_height': bump_height,
'gamma': 0.06,
'sigma': 0.02,
'N': model_input.patchPop(),
'n': model_input.n,
'T': model_input.maxDays()}
model_sol = generate_sol(model_info)
totalError = 0
n = model_info['n']
# Calculate the error for each patch
errors = np.zeros(n)
for i in range(n):
initial_position = beginTime[i]
final_position = endTime[i]
if (deathData[i] != 0 and final_position != 0):
try:
errors[i] = 0.7 * ((model_sol['D'][i][initial_position] - 1.0)**2 + (
model_sol['D'][i][final_position] - deathData[i])**2)
except:
print(
f"Error at: n={n}, i={i}, final_position={final_position}, len(model_sol['D'])= {len(model_sol['D'])}, model_sol['D'][i] = {model_sol['D'][i]}, deathData[i] = {deathData[i]}")
else:
errors[i] = 0.3 * ((model_sol['D'][i][initial_position] - 1.0)**2)
# Calculate the total error
totalError = np.sum(errors)
return totalError
# Define the objective function to minimize (sum of squared errors)
def objectiveFunction(parameters, beginTime, endTime, deathData):
beta, p, bump_center, bump_width, bump_height = parameters
model_info = {'model': SEIRD_model,
'init': {
'S': model_input.S0,
'E': model_input.E0,
'I': model_input.I0,
'R': model_input.R0,
'D': model_input.D0,
},
'gdf': southeastScania,
# defining the initial values for the model
'beta': model_input.beta,
'p': p,
'mu': model_input.mu,
'bump_center': bump_center,
'bump_width': bump_width,
'bump_height': bump_height,
'gamma': 0.06,
'sigma': 0.02,
'N': model_input.patchPop(),
'n': model_input.n,
'T': model_input.maxDays()}
model_sol = generate_sol(model_info)
totalError = 0
n = model_info['n']
# Calculate the error for each patch
errors = np.zeros(n)
for i in range(n):
initial_position = beginTime[i]
final_position = endTime[i]
if (deathData[i] != 0 and final_position != 0):
try:
errors[i] = 0.7 * ((model_sol['D'][i][initial_position] - 1.0)**2 + (
model_sol['D'][i][final_position] - deathData[i])**2)
except:
print(
f"Error at: n={n}, i={i}, final_position={final_position}, len(model_sol['D'])= {len(model_sol['D'])}, model_sol['D'][i] = {model_sol['D'][i]}, deathData[i] = {deathData[i]}")
else:
errors[i] = 0.3 * ((model_sol['D'][i][initial_position] - 1.0)**2)
# Calculate the total error
totalError = np.sum(errors)
return totalError
Parameter estimation
In [21]:
Copied!
# Set up the data to fit
# beginTime = southeastScania['BeginDaysPlague'].values
# endTime = southeastScania['EndDaysPlague'].values
# deathData = southeastScania['VictimsNumber'].values
# # Choose initial guesses for the parameters to fit
# beta_guess = 0.3
# p_guess = 0.1
# bump_center_guess = 0.1
# bump_width_guess = 180.0
# bump_height_guess = 30.0
# # Minimize the objective function to obtain estimates for beta and gamma
# result = optimize.minimize(objectiveFunction, x0=(beta_guess, p_guess, bump_center_guess, bump_width_guess, bump_height_guess), args=(beginTime, endTime, deathData),
# method='L-BFGS-B'
# # ,bounds=[(0, 1), (0, 1), (0, 10), (-2, 2), (-10, 10)]
# )
# beta_estimated, p_estimated, bump_center_estimated, bump_width_estimated, bump_height_estimated = result.x
# print("beta = ", beta_estimated)
# print("p = ", p_estimated)
# print("bump_center = ", bump_center_estimated)
# print("bump_width = ", bump_width_estimated)
# print("bump_height = ", bump_height_estimated)
# Set up the data to fit
# beginTime = southeastScania['BeginDaysPlague'].values
# endTime = southeastScania['EndDaysPlague'].values
# deathData = southeastScania['VictimsNumber'].values
# # Choose initial guesses for the parameters to fit
# beta_guess = 0.3
# p_guess = 0.1
# bump_center_guess = 0.1
# bump_width_guess = 180.0
# bump_height_guess = 30.0
# # Minimize the objective function to obtain estimates for beta and gamma
# result = optimize.minimize(objectiveFunction, x0=(beta_guess, p_guess, bump_center_guess, bump_width_guess, bump_height_guess), args=(beginTime, endTime, deathData),
# method='L-BFGS-B'
# # ,bounds=[(0, 1), (0, 1), (0, 10), (-2, 2), (-10, 10)]
# )
# beta_estimated, p_estimated, bump_center_estimated, bump_width_estimated, bump_height_estimated = result.x
# print("beta = ", beta_estimated)
# print("p = ", p_estimated)
# print("bump_center = ", bump_center_estimated)
# print("bump_width = ", bump_width_estimated)
# print("bump_height = ", bump_height_estimated)
Results from estimations
In [21]:
Copied!
# # Set up the data to fit
beginTime = southeastScania['BeginDaysPlague'].values
endTime = southeastScania['EndDaysPlague'].values
deathData = southeastScania['VictimsNumber'].values
# #result estimation considering seasonality in p and beta, considering (p+seasonality) and (beta+seasonality)
beta_estimated = 0.29999990413218025
p_estimated = 0.10000015278925331
bump_center_estimated = 0.1000005503832051
bump_width_estimated = 180.00000012694866
bump_height_estimated = 30.00000035610224
# #result estimation considering seasonality only (beta + seasonality)
# beta_estimated = 0.3574870151324586
# p_estimated = 0.07419604016603554
# bump_center_estimated = 0.10388586284456384
# bump_width_estimated = 180.00730607111754
# bump_height_estimated = 30.01454878377533
# #result estimation without considering seasonality
# beta_estimated = 0.6159244597903136
# p_estimated = -0.062116989387247884
# bump_center_estimated = 0.1
# bump_width_estimated = 180.0
# bump_height_estimated = 30.0
# # Set up the data to fit
beginTime = southeastScania['BeginDaysPlague'].values
endTime = southeastScania['EndDaysPlague'].values
deathData = southeastScania['VictimsNumber'].values
# #result estimation considering seasonality in p and beta, considering (p+seasonality) and (beta+seasonality)
beta_estimated = 0.29999990413218025
p_estimated = 0.10000015278925331
bump_center_estimated = 0.1000005503832051
bump_width_estimated = 180.00000012694866
bump_height_estimated = 30.00000035610224
# #result estimation considering seasonality only (beta + seasonality)
# beta_estimated = 0.3574870151324586
# p_estimated = 0.07419604016603554
# bump_center_estimated = 0.10388586284456384
# bump_width_estimated = 180.00730607111754
# bump_height_estimated = 30.01454878377533
# #result estimation without considering seasonality
# beta_estimated = 0.6159244597903136
# p_estimated = -0.062116989387247884
# bump_center_estimated = 0.1
# bump_width_estimated = 180.0
# bump_height_estimated = 30.0
Substituting the estimated values into the model and solving it
In [22]:
Copied!
model_estimation = {'model': SEIRD_model,
'init': {
'S': model_input.S0,
'E': model_input.E0,
'I': model_input.I0,
'R': model_input.R0,
'D': model_input.D0,
},
'gdf': southeastScania,
# defining the initial values for the model
'beta': beta_estimated,
'p': p_estimated,
'bump_center': bump_center_estimated,
'bump_width': bump_width_estimated,
'bump_height': bump_height_estimated,
'gamma': 0.06,
'sigma': 0.02,
'mu': 0.2,
'N': model_input.patchPop(),
'n': model_input.n,
'T': model_input.maxDays()}
model_solution = generate_sol(model_estimation)
model_estimation = {'model': SEIRD_model,
'init': {
'S': model_input.S0,
'E': model_input.E0,
'I': model_input.I0,
'R': model_input.R0,
'D': model_input.D0,
},
'gdf': southeastScania,
# defining the initial values for the model
'beta': beta_estimated,
'p': p_estimated,
'bump_center': bump_center_estimated,
'bump_width': bump_width_estimated,
'bump_height': bump_height_estimated,
'gamma': 0.06,
'sigma': 0.02,
'mu': 0.2,
'N': model_input.patchPop(),
'n': model_input.n,
'T': model_input.maxDays()}
model_solution = generate_sol(model_estimation)
Plotting the cumulative number of deaths to check the model fit
In [19]:
Copied!
southeastScania['BeginPlaguePeriod'][1]
southeastScania['BeginPlaguePeriod'][1]
Out[19]:
'1711-10-01 00:00:00'
In [24]:
Copied!
%matplotlib inline
# Number of patches
n = model_estimation['n']
# Set the figsize for each subplot
figsize_single_subplot = (8, 2)
# Calculate the total figure height based on the number of subplots and their height
fig_height = figsize_single_subplot[1] * n
# Create a figure and an array of axes with nrows=n and ncols=1
fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
figsize_single_subplot[0], fig_height), sharex=False)
tick_positions = southeastScania['BeginDaysPlague'].values
tick_labels = southeastScania['BeginPlaguePeriod'].apply(lambda x: x.strftime('%b %Y') if not pd.isna(x) else None).values
# Dictionary that reduces the plotting to those plots with data
#lookup_index = [1, 2, 4, 8, 9, 12, 16, 17]
# Plot model solution D for each patch
for i in range(n):
if deathData[i] != 0 and endTime[i] != 0:
initial_position = beginTime[i]
final_position = endTime[i]
axes[i].plot(initial_position, 0, 'bo')
axes[i].plot(final_position,
deathData[i], 'bo')
axes[i].plot(model_solution['D'][i], color='orange', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Cumulative Deaths')
axes[i].legend(loc = 'lower right')
axes[i].set_xticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
else:
axes[i].plot(model_solution['D'][i],
color='orange', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Cumulative Deaths')
axes[i].legend(loc='lower right')
axes[i].set_xticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
# Adjust the layout to avoid overlapping
plt.tight_layout()
plt.show()
%matplotlib inline
# Number of patches
n = model_estimation['n']
# Set the figsize for each subplot
figsize_single_subplot = (8, 2)
# Calculate the total figure height based on the number of subplots and their height
fig_height = figsize_single_subplot[1] * n
# Create a figure and an array of axes with nrows=n and ncols=1
fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
figsize_single_subplot[0], fig_height), sharex=False)
tick_positions = southeastScania['BeginDaysPlague'].values
tick_labels = southeastScania['BeginPlaguePeriod'].apply(lambda x: x.strftime('%b %Y') if not pd.isna(x) else None).values
# Dictionary that reduces the plotting to those plots with data
#lookup_index = [1, 2, 4, 8, 9, 12, 16, 17]
# Plot model solution D for each patch
for i in range(n):
if deathData[i] != 0 and endTime[i] != 0:
initial_position = beginTime[i]
final_position = endTime[i]
axes[i].plot(initial_position, 0, 'bo')
axes[i].plot(final_position,
deathData[i], 'bo')
axes[i].plot(model_solution['D'][i], color='orange', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Cumulative Deaths')
axes[i].legend(loc = 'lower right')
axes[i].set_xticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
else:
axes[i].plot(model_solution['D'][i],
color='orange', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Cumulative Deaths')
axes[i].legend(loc='lower right')
axes[i].set_xticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
# Adjust the layout to avoid overlapping
plt.tight_layout()
plt.show()
Plotting the daily deaths by parish
In [25]:
Copied!
# Computing the daily deaths from the model output for a given patch
def daily_deaths(solution_model: dict, patch: int, T_inf: int, T_sup: int) -> list[float]:
assert T_inf >= 0, "t must be greater than 0"
assert T_sup >= 0, "t must be greater than 0"
cumulative_deaths = solution_model['D'][patch] # list of floats
return [cumulative_deaths[t+1] - cumulative_deaths[t]
for t in range(T_inf, T_sup)]
# Computing the daily deaths from the model output for a given patch
def daily_deaths(solution_model: dict, patch: int, T_inf: int, T_sup: int) -> list[float]:
assert T_inf >= 0, "t must be greater than 0"
assert T_sup >= 0, "t must be greater than 0"
cumulative_deaths = solution_model['D'][patch] # list of floats
return [cumulative_deaths[t+1] - cumulative_deaths[t]
for t in range(T_inf, T_sup)]
In [28]:
Copied!
%matplotlib inline
# Number of patches
n = model_estimation['n']
# Set the figsize for each subplot
figsize_single_subplot = (8, 2)
# Calculate the total figure height based on the number of subplots and their height
fig_height = figsize_single_subplot[1] * n
# Create a figure and an array of axes with nrows=n and ncols=1
fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
figsize_single_subplot[0], fig_height))
# tick_positions = southeastScania['BeginDaysPlague'].values
# tick_labels = southeastScania['BeginPlaguePeriod'].apply(
# lambda x: x.strftime('%b %Y') if not pd.isna(x) else None).values
for i in range(n):
# Plot daily deaths for each patch i
axes[i].plot(daily_deaths(model_solution, i, 0, model_input.maxDays()),
color='blue', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Daily Deaths')
axes[i].legend(loc='upper right')
axes[i].xaxis.set_ticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
# Adjust the layout to avoid overlapping
plt.tight_layout()
plt.show()
%matplotlib inline
# Number of patches
n = model_estimation['n']
# Set the figsize for each subplot
figsize_single_subplot = (8, 2)
# Calculate the total figure height based on the number of subplots and their height
fig_height = figsize_single_subplot[1] * n
# Create a figure and an array of axes with nrows=n and ncols=1
fig, axes = plt.subplots(nrows=n, ncols=1, figsize=(
figsize_single_subplot[0], fig_height))
# tick_positions = southeastScania['BeginDaysPlague'].values
# tick_labels = southeastScania['BeginPlaguePeriod'].apply(
# lambda x: x.strftime('%b %Y') if not pd.isna(x) else None).values
for i in range(n):
# Plot daily deaths for each patch i
axes[i].plot(daily_deaths(model_solution, i, 0, model_input.maxDays()),
color='blue', label=(model_input.patchNames()[i]))
axes[i].set_ylabel('Daily Deaths')
axes[i].legend(loc='upper right')
axes[i].xaxis.set_ticks(tick_positions, tick_labels,
rotation=70, fontsize=9)
# Adjust the layout to avoid overlapping
plt.tight_layout()
plt.show()